Cost problem:¶
Path chosen by UCS: A - S - F - B => 138 + 41 + 112 = 291
Alternate path: A - S - RV - P - B => 138 + 49 + 29 + 67 = 283
We can make our algorithm to choose a cheaper path, by making better choice of nodes each time we choose one from the bag, the priority queue openSet
Approach¶
Now we are clear the cost is the determining factor in choosing the next node. If we could add some sense to the gScore, about how we are faring in proceeding towards the goal, that could help to improve the choice. That is,
new_cost = total_cost_from_start_to_that_node + a value representative of how we are faring towards the goal direction
Since its addition, we could think, in terms of same unit, that is, distance itself. In a space, the shortest possible distance between two points, is the straight line distance (SLD) between them. Any path we are going to deduce, will have a cost, always greater to that.
When we are at any node, the shortest virtual straight line distance from that node to goal, could then always help us choose a better node.
Let us call it heuristic score hScore or h(n). Let the new cost be fScore or f(n). Then
fScore = gScore + hScore
If there are 3 neighbors to add to the queue, whichever neighbor having least sum of gScore and hScore, that is fScore could be our next best candidate. Why?
Least gScore tells us, that neighbor took least cost to arrive at that point among others. Least hScore tells us, that neighbor is roughly possibly nearer to the goal than others.
Since we are including only hScore for comparision w.r.t how distant they are from the goal for all nodes, and since that could be the minimum distance possible, we could be sure that, our comparision is fair for all nodes, in our scenario.
Note:
Remember, hScore is kinda approximation. And if costs include more factors, hScore could screw up also. SLD may not always be best hScore in all scenarios. Thus there are multiple ways to calculate hScore. Even SLD approach includes various methods like Euclidean, Manhattan etc. We stick with geodesic SLD for now.
It is also important, hScore should not over estimate actual cost as much as possible. Then, we might not be making a fair comparision between the fScores of nodes.
Heuristics is a deep subject by itself, so for now, we will move on with basics. There could never be an end to tuning an algorithm.
Algorithm Development¶
Calculating cost is the primary change now. At every node, calculate a new total cost as below and store that in the queue/bag.
# f(n) = g(n) + h(n)
# g(n) = total actual SLD so far from 'start' to 'current neighbor'
# h(n) = SLD from current neighbor to 'goal'
gScore
In UCS, we used g(n) as below.
g(n) or gScore of each_neighbor = current_cost from start to current_node + distance(current_node, each_neighbor)
The current_cost was coming directly from queue, because that was what we stored for every node in first place. Now, we are going to store a different cost fScore or f(n) in the queue. So we would need a function to calculate the current_cost value.
We could use cameFrom
to track back the nodes till start, and at every iteration, simply calculate the distance and sum up step costs to arrive at gScore. Thus this would be just amalgamation of reconstructPath
and distance
we already saw.
from docHelpers_ipython import romania_location_map
import geopy
from math import floor
# TEST SETUP
M = romania_location_map
start = 'A'
cameFrom = {}
cameFrom[start] = None
for each_parent in M.keys():
for each_child in M[each_parent]['connections']:
cameFrom[each_child] = each_parent
def distance(start_node, end_node):
(x0,y0) = M[start_node]['pos']
(x1,y1) = M[end_node]['pos']
return floor(geopy.distance.geodesic((y0,x0), (y1,x1)).miles)
# NEEDED FUNCTION TO CALCULATE DISTANCE FROM CURRENT NODE BACK TO START
def totalDistance(current_node):
cost = 0
while current_node is not None:
parent = cameFrom.get(current_node, None)
if parent is not None:
cost += distance(current_node, parent)
current_node = parent
return cost
print(totalDistance('F')) # 138 + 41 = 179
print(totalDistance('C')) # 138 + 49 + 29 + 64 = 280
You might have manually arrived at a different value for 'C'. Note 'C' could be arrived at via multiple routes due to cycles in the graph and our function calculated just how cameFrom was constructed in test setup just above, which in turn depends on how our map description M was.
This is an important insight. In our algorithm, this later would make an improvisation. But for now, remember, cameFrom
in our algorithm depends on how nodes were traversed, and thus totalDistance
depends on that. That is ok for now to proceed.
hScore
This heuristic score, so as to have same 'weightage' as the gScore, it would be summed up with, shall also be a virtual SLD from neighbor to the goal for reasons we discussed earlier. So programmatically it is simply calling distance function with both nodes (neighbor and goal)
Start
Let us initialize as usual..same as UCS here..
openSet = { (0,'A') }
closedSet = ( 'A' )
cameFrom[ 'A' ] = None
Coding..
from queue import PriorityQueue
from docHelpers_ipython import romania_location_map
cameFrom = { }
openSet = PriorityQueue()
M = romania_location_map
start = 'A'
goal = 'B'
openSet.put((0,start))
closedSet = set(start)
cameFrom[start] = None
ITERATION 1
Is openSet empty? No. So Go on. Take least cost path from openSet {(0, 'A')} : (0, 'A') Current node: 'A'. Kids: 'S','T','Z' Is 'A' the goal? No. So Go on.closedSet : ( 'A' )
'A' to 'S':
past cost from 'A' to 'S' : 138
heuristic cost from 'S' to 'B' : 135
total : 273
closedSet : ( 'A', 'S' ) 'A' to 'T':
past cost from 'A' to 'T' : 30
heuristic cost from 'T' to 'B' : 256
total : 286
closedSet : ( 'A', 'S', 'T' )
'A' to 'Z':
past cost from 'A' to 'Z' : 31
heuristic cost from 'Z' to 'B' : 269
total : 300
closedSet : ( 'A', 'S', 'T', 'Z' )Result: openSet = { (273,'S'), (286,'T'), (300,'Z') }
if not openSet.empty(): # Note unike deque, we check differently using empty() if our bag is empty or not
current_cost, current_node = openSet.get() # Note, we 'get()' now and it returns a 'set', a tuple
if current_node is not goal:
all_neighbors = M.get(current_node,[]).get('connections',[])
for each_neighbor in all_neighbors: # each neighbor is a 'set' having a cost and node itself
if each_neighbor not in closedSet:
#NOTE: fScore = gScore + hScore
gScore = totalDistance(current_node) + distance(current_node,each_neighbor)
hScore = distance(each_neighbor, goal)
fScore = gScore + hScore
openSet.put((fScore, each_neighbor))
cameFrom[each_neighbor] = current_node
closedSet.add(each_neighbor)
print(list(openSet.queue))
print(list(closedSet))
Visualizing..(do not bother about order of added nodes)
from docHelpers_ipython import Doc
from IPython.core.display import HTML
doc = Doc(M)
resultHTML = doc.computeGraphs('A',['T','S','Z'], mappy=True, tree=True, queue=True, HTML=True)
HTML(resultHTML)
ITERATION 2
Is openSet empty? No. So Go on. Take least cost path from openSet { (273,'S'), (286,'T'), (300,'Z')** } : (273,'S') Current node: 'S'. Kids: 'F', 'RV', 'O'
Is 'S' the goal? No. So Go on.closedSet : ( 'A', 'S', 'T', 'Z' )
'S' to 'F':
past cost from 'A' to 'F' : 138+41
heuristic cost from 'F' to 'B' : 112
total : 291 closedSet : ( 'A', 'S', 'T', 'Z' ,'F' )
'S' to 'RV':
past cost from 'A' to 'RV' : 138+49
heuristic cost from 'RV' to 'B' : 97
total : 284
closedSet : ( 'A', 'S', 'T', 'Z', 'F', 'RV' )
'S' to 'O':
past cost from 'A' to 'O' : 138+136
heuristic cost from 'O' to 'B' : 271
total : 545
closedSet : ( 'A', 'S', 'T', 'Z', 'F', 'RV', 'O' )Result: openSet = { (286,'T'), (300,'Z'), (291,'F'), (284,'RV'), (545,'O') }
if not openSet.empty(): # Note unike deque, we check differently using empty() if our bag is empty or not
current_cost, current_node = openSet.get() # Note, we 'get()' now and it returns a 'set', a tuple
if current_node is not goal:
all_neighbors = M.get(current_node,[]).get('connections',[])
for each_neighbor in all_neighbors: # each neighbor is a 'set' having a cost and node itself
if each_neighbor not in closedSet:
#NOTE: fScore = gScore + hScore
gScore = totalDistance(current_node) + distance(current_node,each_neighbor)
hScore = distance(each_neighbor, goal)
fScore = gScore + hScore
openSet.put((fScore, each_neighbor))
cameFrom[each_neighbor] = current_node
closedSet.add(each_neighbor)
print(list(openSet.queue))
print(list(closedSet))
resultHTML = doc.computeGraphs('S',['F','RV','O'], mappy=True, tree=True, queue=True, HTML=True)
HTML(resultHTML)
ITERATION 3
Is openSet empty? No. So Go on. Take least cost path from openSet { (286,'T'), (300,'Z'), (291,'F'), (284,'RV'), (336,'O') } : (284,'RV') Current node: 'RV'. Kids: 'C', 'P'
Is 'RV' the goal? No. So Go on.closedSet : ( 'A', 'S', 'T', 'Z', 'F', 'RV', 'O' )
'RV' to 'C':
past cost from 'A' to 'C' : 138+49+60
heuristic cost from 'C' to 'B' : 114
total : 361 closedSet : ( 'A', 'S', 'T', 'Z', 'F', 'RV', 'O', 'C' ) 'RV' to 'P':
past cost from 'A' to 'P' : 138+49+29
heuristic cost from 'P' to 'B' : 67
total : 283
closedSet : ( 'A', 'S', 'T', 'Z', 'F', 'RV', 'O', 'C', 'P' )Result: openSet = { (286,'T'), (300,'Z'), (291,'F'), (545,'O'), (361,'C'), (283,'P') }
if not openSet.empty(): # Note unike deque, we check differently using empty() if our bag is empty or not
current_cost, current_node = openSet.get() # Note, we 'get()' now and it returns a 'set', a tuple
if current_node is not goal:
all_neighbors = M.get(current_node,[]).get('connections',[])
for each_neighbor in all_neighbors: # each neighbor is a 'set' having a cost and node itself
if each_neighbor not in closedSet:
#NOTE: fScore = gScore + hScore
gScore = totalDistance(current_node) + distance(current_node,each_neighbor)
hScore = distance(each_neighbor, goal)
fScore = gScore + hScore
openSet.put((fScore, each_neighbor))
cameFrom[each_neighbor] = current_node
closedSet.add(each_neighbor)
print(list(openSet.queue))
print(list(closedSet))
resultHTML = doc.computeGraphs('RV',['C','P'], mappy=True, tree=True, queue=True, HTML=True)
HTML(resultHTML)
ITERATION 4
Is openSet empty? No. So Go on. Take least cost path from openSet { (286,'T'), (300,'Z'), (291,'F'), (336,'O'), (361,'C'), (283,'P') } : (283,'P') Current node: 'P'. Kids: 'B' ('C' ignored as already visited)
Is 'P' the goal? No. So Go on.closedSet : ( 'A', 'S', 'T', 'Z', 'F', 'RV', 'O', 'C', 'P' )
'P' to 'B':
past cost from 'A' to 'B' : 138+49+29+67
heuristic cost from 'B' to 'B' : 0
total : 283 closedSet : ( 'A', 'S', 'T', 'Z', 'F', 'RV', 'O', 'C', 'P', 'B' )Result: openSet = { (286,'T'), (300,'Z'), (291,'F'), (545,'O'), (361,'C'), (283,'B') }
if not openSet.empty(): # Note unike deque, we check differently using empty() if our bag is empty or not
current_cost, current_node = openSet.get() # Note, we 'get()' now and it returns a 'set', a tuple
if current_node is not goal:
all_neighbors = M.get(current_node,[]).get('connections',[])
for each_neighbor in all_neighbors: # each neighbor is a 'set' having a cost and node itself
if each_neighbor not in closedSet:
#NOTE: fScore = gScore + hScore
gScore = totalDistance(current_node) + distance(current_node,each_neighbor)
hScore = distance(each_neighbor, goal)
fScore = gScore + hScore
openSet.put((fScore, each_neighbor))
cameFrom[each_neighbor] = current_node
closedSet.add(each_neighbor)
print(list(openSet.queue))
print(list(closedSet))
resultHTML = doc.computeGraphs('P',['B'], mappy=True, tree=True, queue=True, HTML=True)
HTML(resultHTML)
ITERATION 5
Is openSet empty? No. So Go on. Take least cost path from openSet { (286,'T'), (300,'Z'), (291,'F'), (545,'O'), (361,'C'), (283,'B') } : (283,'B') Current node: 'B'. Kids: 'G', 'U' **Is 'B' the goal? YES. So return.**
if not openSet.empty(): # Note unike deque, we check differently using empty() if our bag is empty or not
current_cost, current_node = openSet.get() # Note, we 'get()' now and it returns a 'set', a tuple
if current_node is goal:
print('Yes. Success. Current node: ', current_node)
if current_node is not goal:
all_neighbors = M.get(current_node,[]).get('connections',[])
for each_neighbor in all_neighbors: # each neighbor is a 'set' having a cost and node itself
if each_neighbor not in closedSet:
#NOTE: fScore = gScore + hScore
gScore = totalDistance(current_node) + distance(current_node,each_neighbor)
hScore = distance(each_neighbor, goal)
fScore = gScore + hScore
openSet.put((fScore, each_neighbor))
cameFrom[each_neighbor] = current_node
closedSet.add(each_neighbor)
print(list(openSet.queue))
print(list(closedSet))
resultHTML = doc.computeGraphs('B',['G','U'], mappy=True, tree=True, queue=True, HTML=True)
HTML(resultHTML)
Done :) We not only found the route, we found the cheapest route from 'A' to 'B'
Let us modularize as a proper function like BFS/DFS/UCS and check again..
from queue import PriorityQueue
from docHelpers_ipython import romania_location_map
from math import floor
import geopy
cameFrom = { }
# NEEDED FUNCTION TO CALCULATE DISTANCE FROM CURRENT NODE BACK TO START
def totalDistance(current_node):
cost = 0
while current_node is not None:
parent = cameFrom.get(current_node, None)
if parent is not None:
cost += distance(current_node, parent)
current_node = parent
return cost
def distance(start_node, end_node):
(x0,y0) = M[start_node]['pos']
(x1,y1) = M[end_node]['pos']
return floor(geopy.distance.geodesic((y0,x0), (y1,x1)).miles)
def reconstructPath(current_node):
Path = [current_node]
while current_node is not None:
current_node = cameFrom[current_node]
Path.append(current_node)
return reversed(Path[:-1])
def AStar(start, goal):
# INITIALIZATION
openSet = PriorityQueue()
openSet.put((0,start))
closedSet = set(start)
cameFrom[start] = None
# MAIN LOOP
while not openSet.empty():
current_cost, current_node = openSet.get()
if current_node is goal:
print('Success. Route from {} to {} found. Cost: {} Path: {}'.format(start,goal,current_cost,list(reconstructPath(current_node))))
break
all_neighbors = M.get(current_node,[]).get('connections',[])
for each_neighbor in all_neighbors:
if each_neighbor not in closedSet:
#gScore = 'cost till current_node' + 'cost between current node and its neighbor'
gScore = totalDistance(current_node) + distance(current_node,each_neighbor)
hScore = distance(each_neighbor, goal)
fScore = gScore + hScore
openSet.put((fScore, each_neighbor))
cameFrom[each_neighbor] = current_node
closedSet.add(each_neighbor)
#print(openSet.queue) #just to verify
return 'No Goal found!'
M = romania_location_map
start = 'A'
goal = 'B'
result = AStar(start, goal)
Visualization (Optional)¶
We will inject a small code to keep track of bag contents, nodes traversed for sake of visualization. And then render an animation to visualize how nodes were traversed to reach the goal.
from queue import PriorityQueue
from docHelpers_ipython import romania_location_map
from math import floor
import geopy
# VISUALIZATION PURPOSE
from docHelpers_ipython import Doc
from IPython.display import display, Image
cameFrom = { }
# NEEDED FUNCTION TO CALCULATE DISTANCE FROM CURRENT NODE BACK TO START
def totalDistance(current_node):
cost = 0
while current_node is not None:
parent = cameFrom.get(current_node, None)
if parent is not None:
cost += distance(current_node, parent)
current_node = parent
return cost
def distance(start_node, end_node):
(x0,y0) = M[start_node]['pos']
(x1,y1) = M[end_node]['pos']
return floor(geopy.distance.geodesic((y0,x0), (y1,x1)).miles)
def reconstructPath(current_node):
Path = [current_node]
while current_node is not None:
current_node = cameFrom[current_node]
Path.append(current_node)
return reversed(Path[:-1])
def AStar(start, goal):
# INITIALIZATION
openSet = PriorityQueue()
openSet.put((0,start))
closedSet = set(start)
cameFrom[start] = None
# MAIN LOOP
while not openSet.empty():
current_cost, current_node = openSet.get()
if current_node is goal:
print('Success. Route from {} to {} found. Cost: {} Path: {}'.format(start,goal,current_cost,list(reconstructPath(current_node))))
break
# VISUALIZATION PURPOSE
all_neighbors = reversed(M.get(current_node,[]).get('connections',[]))
considered_neighbors = list(set(all_neighbors) - set(closedSet)) # thank you: https://stackoverflow.com/questions/3462143/get-difference-between-two-lists
all_neighbors = M.get(current_node,[]).get('connections',[])
for each_neighbor in all_neighbors:
if each_neighbor not in closedSet:
#gScore = 'cost till current_node' + 'cost between current node and its neighbor'
gScore = totalDistance(current_node) + distance(current_node,each_neighbor)
hScore = distance(each_neighbor, goal)
fScore = gScore + hScore
openSet.put((fScore, each_neighbor))
cameFrom[each_neighbor] = current_node
closedSet.add(each_neighbor)
# VISUALIZATION PURPOSE
_ = doc.computeGraphs(current_node, considered_neighbors)
#print(openSet.queue) #just to verify
# VISUALIZATION PURPOSE
_ = doc.computeGraphs(current_node, [])
return 'No Goal found!'
M = romania_location_map
start = 'A'
goal = 'B'
# VISUALIZATION PURPOSE - called here caz we use doc in ourSearchAlgo
doc = Doc(M)
result = AStar(start, goal)
# VISUALIZATION PURPOSE
images = doc.render()
display(Image(images[0]),Image(images[1]),Image(images[2]))
Ha Ha, Isnt that awesome? Algorithm is traversing towards goal, as if it already knows the path, and still ends up with the cheapest path.
UCS vs AStar Visual Comparison:¶
Take a moment to compare the UCS and Astar, we just finished.
While AStar might appear as if it got lucky like DFS, it is not completely reliant on luck like DFS (remember we 'hacked' a little in our strategy to reverse feed). Here we get to tune the luck with heuristic function which is much more reliant, and above all, our node evaluation depends on cost. Thus AStar is able to promise cheapest path possible in the graph.
That's it?¶
There is still a small trap ahead. Try from 'A' to 'U' (over to next section for solving this). You will not get cheapest path